use std::{
    collections::{HashMap, VecDeque},
    sync::Arc,
};

use futures_util::{SinkExt, StreamExt};
use tokio::sync::{
    mpsc::{self, UnboundedSender},
    RwLock,
};
use warp::{filters::ws::Message};

type Topic = String;
type Event = Vec<u8>;
type WsSender = UnboundedSender<Message>;

pub struct Broker {
    //events：存储每个主题的事件。
    //subscribers：跟踪每个主题的订阅者。
    events: Arc<RwLock<HashMap<Topic, VecDeque<Event>>>>,
    subscribers: Arc<RwLock<HashMap<Topic, Vec<WsSender>>>>,
}
impl Broker {
    pub fn new() -> Self {
        Broker {
            events: Arc::new(RwLock::new(HashMap::new())),
            subscribers: Arc::new(RwLock::new(HashMap::new())),
        }
    }

    pub async fn produce(&self, topic: Topic, event: Event) {
        let mut events = self.events.write().await;
        events
            .entry(topic.clone())
            .or_default()
            .push_back(event.clone());
        // 异步通知所有订阅者
        let subscribers_list;
        {
            let subscribers = self.subscribers.read().await;
            subscribers_list = subscribers.get(&topic).cloned().unwrap_or_default();
        }

        for ws_sender in subscribers_list {
            // 将事件发送到WebSocket客户端
            let _ = ws_sender.send(Message::binary(event.clone()));
        }
    }

    pub async fn subscribe(&self, topic: Topic, socket: warp::ws::WebSocket) {
        let (ws_sender, mut _ws_receiver) = socket.split();
        let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
        {
            let mut subs = self.subscribers.write().await;
            subs.entry(topic).or_default().push(tx);
        }
        // tokio::task::spawn(async move {
        //     while let Some(result) = ws_receiver.next().await {
        //         match result {
        //             Ok(message) => {
        //                 // 处理有效的消息
        //                 if message.is_text() {
        //                     // println!(
        //                     //     "Received message from client: {}",
        //                     //     message.to_str().unwrap()
        //                     // );
        //                 }
        //             }
        //             Err(e) => {
        //                 // 处理错误
        //                 eprintln!("WebSocket error: {:?}", e);
        //                 break;
        //             }
        //         }
        //     }
        //     println!("WebSocket connection closed");
        // });

        tokio::task::spawn(async move {
            let mut sender = ws_sender;
            while let Some(msg) = rx.recv().await {
                let _ = sender.send(msg).await;
            }
        });
    }
}
